Let's dig into non parametric testing, starting with why it is even needed in the first place.
import pandas as pd
import numpy as np
from scipy.stats import bernoulli, binom, norm, ks_2samp, ttest_ind
from scipy import integrate
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import rc, animation
from IPython.core.display import display, HTML
from mpl_toolkits.mplot3d import Axes3D
from matplotlib.ticker import LinearLocator, FormatStrFormatter
from _plotly_future_ import v4_subplots
from plotly import tools
import plotly
import plotly.graph_objs as go
from plotly.offline import iplot
import plotly.figure_factory as ff
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
sns.set(style="white", palette="husl")
sns.set_context("talk")
sns.set_style("ticks")
plotting_env = 'prod'
def plot_util(fig):
if plotting_env == 'prod':
html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
display(HTML(html_fig))
else:
plotly.offline.iplot(fig)
sample_size = 200
mean1 = 0
mean2 = 1
var1 = 1
var2 = 1
samp1 = np.random.normal(mean1, var1, sample_size)
samp2 = np.random.normal(mean2, var2, sample_size)
trace1a = go.Histogram(
x=samp1,
nbinsx=50,
name="Sample 1",
marker_color='blue',
opacity=0.7
)
trace2a = go.Histogram(
x=samp2,
nbinsx=50,
name="Sample 2",
marker_color='crimson',
opacity=0.7
)
x_axis = np.arange(-3, 4, 0.01)
y_samp1_pdf = norm.pdf(x_axis, mean1, var1)
y_samp2_pdf = norm.pdf(x_axis, mean2, var2)
y_samp1_cdf = norm.cdf(x_axis, mean1, var1)
y_samp2_cdf = norm.cdf(x_axis, mean2, var2)
trace1b = go.Scatter(
x=x_axis,
y=y_samp1_pdf,
marker = dict(color='blue'),
fill='tozeroy',
fillcolor='rgba(0, 0, 255, 0.2)',
name="Sample 1",
showlegend=False
)
trace2b = go.Scatter(
x=x_axis,
y=y_samp2_pdf,
marker = dict(color='red'),
fill='tozeroy',
fillcolor='rgba(255, 0, 0, 0.2)',
name="Sample 2",
showlegend=False
)
fig = plotly.subplots.make_subplots(
rows=1,
cols=2,
print_grid=False,
subplot_titles=('Sample 1 & 2 Histograms', 'Sample 1 & 2 PDFs')
)
fig.append_trace(trace1a, 1, 1)
fig.append_trace(trace2a, 1, 1)
fig.append_trace(trace1b, 1, 2)
fig.append_trace(trace2b, 1, 2)
fig.update_traces(marker=dict(line=dict(width=1, color='black')))
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
layout = go.Layout(
barmode='overlay',
width=950,
height=450,
xaxis1=dict(title="X"),
yaxis=dict(title="Count"),
xaxis2=dict(title='X'),
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)'
)
fig.update_layout(
layout
)
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
We can perform a traditional t test to determine if the samples are drawn from different distributions:
t2, p2 = ttest_ind(samp1, samp2)
display(f"t: {t2}, p: {p2}")
The t test was able to nicely identify that these samples were in fact drawn from different distributions. For all of the detail behind the inner workings of the t test, I recommend checking out my post on statistical inference.
sample_size = 200
mean1 = 1
mean2 = 1
var1 = 1
var2 = 5
samp1 = np.random.normal(mean1, var1, sample_size)
samp2 = np.random.normal(mean2, var2, sample_size)
trace1a = go.Histogram(
x=samp1,
nbinsx=20,
name="Sample 1",
marker_color='blue',
opacity=0.7
)
trace2a = go.Histogram(
x=samp2,
nbinsx=50,
name="Sample 2",
marker_color='crimson',
opacity=0.7
)
x_axis = np.arange(-15, 15, 0.01)
y_samp1_pdf = norm.pdf(x_axis, mean1, var1)
y_samp2_pdf = norm.pdf(x_axis, mean2, var2)
y_samp1_cdf = norm.cdf(x_axis, mean1, var1)
y_samp2_cdf = norm.cdf(x_axis, mean2, var2)
trace1b = go.Scatter(
x=x_axis,
y=y_samp1_pdf,
marker = dict(color='blue'),
fill='tozeroy',
fillcolor='rgba(0, 0, 255, 0.2)',
name="Sample 1",
showlegend=False
)
trace2b = go.Scatter(
x=x_axis,
y=y_samp2_pdf,
marker = dict(color='red'),
fill='tozeroy',
fillcolor='rgba(255, 0, 0, 0.2)',
name="Sample 2",
showlegend=False
)
fig = plotly.subplots.make_subplots(
rows=1,
cols=2,
print_grid=False,
subplot_titles=('Sample 1 & 2 Histograms', 'Sample 1 & 2 PDFs')
)
fig.append_trace(trace1a, 1, 1)
fig.append_trace(trace2a, 1, 1)
fig.append_trace(trace1b, 1, 2)
fig.append_trace(trace2b, 1, 2)
fig.update_traces(marker=dict(line=dict(width=1, color='black')))
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
layout = go.Layout(
barmode='overlay',
width=950,
height=450,
xaxis1=dict(title="X"),
yaxis=dict(title="Count"),
xaxis2=dict(title='X'),
yaxis2=dict(title="Probability Density"),
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)'
)
fig.update_layout(
layout
)
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
How does the t test perform in this case? Very poorly:
t2, p2 = ttest_ind(samp1, samp2)
display(f"t: {t2}, p: {p2}")
It incorrectly concludes that there is no meaningful difference between these two samples. Now, what could be a way to identify this difference in variance? Recall the cumulative distribution function:
trace1a = go.Histogram(
x=samp1,
nbinsx=20,
name="Sample 1",
marker_color='blue',
opacity=0.7,
cumulative_enabled=True,
showlegend=False
)
trace2a = go.Histogram(
x=samp2,
nbinsx=20,
name="Sample 2",
marker_color='crimson',
opacity=0.7,
cumulative_enabled=True,
showlegend=False
)
x_axis = np.arange(-15, 15, 0.01)
y_samp1_cdf = norm.cdf(x_axis, mean1, var1)
y_samp2_cdf = norm.cdf(x_axis, mean2, var2)
trace1b = go.Scatter(
x=x_axis,
y=y_samp1_cdf,
marker = dict(color='blue'),
name="Sample 1",
showlegend=False
)
trace2b = go.Scatter(
x=x_axis,
y=y_samp2_cdf,
marker = dict(color='red'),
name="Sample 2",
showlegend=False
)
fig = plotly.subplots.make_subplots(
rows=1,
cols=2,
print_grid=False,
subplot_titles=('Sample 1 & 2 Cumulative Histograms', 'Sample 1 & 2 Cumulative Distribution Functions')
)
fig.append_trace(trace1a, 1, 1)
fig.append_trace(trace2a, 1, 1)
fig.append_trace(trace1b, 1, 2)
fig.append_trace(trace2b, 1, 2)
fig.update_traces(marker=dict(line=dict(width=1, color='black')))
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
layout = go.Layout(
barmode='overlay',
width=950,
height=450,
xaxis1=dict(title="X"),
yaxis=dict(title="Count"),
xaxis2=dict(title='X'),
yaxis2=dict(title=f'$P(X \leq x)$'),
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)'
)
fig.update_layout(
layout
)
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
There is clearly a noticable difference in the CDFs! That is where the KS score comes into play. It tries to find the maximum distance between the empirical CDFs. We can look at the empirical cdfs with the help of this function:
def empirical_cdf(x):
x.sort()
return x, np.arange(0, x.shape[0], 1) / (x.shape[0] - 1)
Take a look below:
def empirical_cdf(x):
"""Returns the x and y values of the sample's (x) emprical cumulative distribution.
- Note: In order to plot as a step function, must pass `line={'shape': 'hv'}` to plotly.
"""
x.sort()
return x, np.arange(0, x.shape[0], 1) / (x.shape[0] - 1)
x_axis_samp1, emp_cdf_samp1 = empirical_cdf(samp1)
x_axis_samp2, emp_cdf_samp2 = empirical_cdf(samp2)
trace1 = go.Scatter(
x=x_axis_samp1,
y=emp_cdf_samp1,
line={'shape': 'hv'},
marker = dict(color='blue'),
name="Sample 1"
)
trace2 = go.Scatter(
x=x_axis_samp2,
y=emp_cdf_samp2,
line={'shape': 'hv'},
marker = dict(color='red'),
name="Sample 2"
)
data = [trace1, trace2]
layout = go.Layout(
width=700,
height=400,
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
title="Empirical CDF",
xaxis=dict(title="X"),
yaxis=dict(title="Cumulative Probability")
)
fig = go.Figure(data=data, layout=layout)
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
Now, the KS score will find the maximum difference in height between the two curves above. This can be seen in the plot below:
def ks_score_implementation(x1, x2):
# Sort and combine all data into total input space
data1 = np.sort(x1)
data2 = np.sort(x2)
data_all = np.concatenate([data1, data2])
# Determine 'cdf' of each sample, based on entire input space. Note, this is not a traditional CDF.
# cdf1_data and cdf2_data simply are arrays that hold the value of F_hat based on identical inputs
# from data_all
cdf1_data = empirical_dist_helper(data1, data_all)
cdf2_data = empirical_dist_helper(data2, data_all)
cdf_diffs = cdf1_data - cdf2_data
minS = -np.min(cdf_diffs)
maxS = np.max(cdf_diffs)
d = max(minS, maxS)
# This block is used only to return the cdf points that yielded the highest KS Score. Used for plotting.
arg_min_S = -np.argmin(cdf_diffs)
arg_max_S = np.argmax(cdf_diffs)
d_idx = max(arg_min_S, arg_max_S)
max_distance_points = ((data_all[d_idx], cdf1_data[d_idx]), (data_all[d_idx], cdf2_data[d_idx]))
return d, max_distance_points
def empirical_dist_helper(sample, t):
n = sample.shape[0]
return np.searchsorted(sample, t, side='right') / n
x_axis_samp1, emp_cdf_samp1 = empirical_cdf(samp1)
x_axis_samp2, emp_cdf_samp2 = empirical_cdf(samp2)
trace1 = go.Scatter(
x=x_axis_samp1,
y=emp_cdf_samp1,
line={'shape': 'hv'},
marker = dict(color='blue'),
name="Sample 1"
)
trace2 = go.Scatter(
x=x_axis_samp2,
y=emp_cdf_samp2,
line={'shape': 'hv'},
marker = dict(color='red'),
name="Sample 2"
)
# Find points that yield largest distance (and hence highest KS Score), plot vertical line
_, ks_points = ks_score_implementation(samp1, samp2)
ks_point1, ks_point2 = ks_points
trace3 = go.Scatter(
x=[ks_point1[0]],
y=[ks_point1[1]],
marker = dict(
color = 'green',
size=6
),
mode="markers",
showlegend=False
)
trace4 = go.Scatter(
x=[ks_point2[0]],
y=[ks_point2[1]],
marker = dict(
color = 'green',
size=6
),
mode="markers",
showlegend=False
)
trace5 = go.Scatter(
x=[ks_point1[0], ks_point2[0]],
y=[ks_point1[1], ks_point2[1]],
mode='lines',
marker = dict(
color = 'green',
size=6
),
name="KS score (max distance <br>between empirical cdfs)"
)
data = [trace1, trace2, trace3, trace4, trace5]
layout = go.Layout(
width=700,
height=400,
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
title="Empirical CDF",
xaxis=dict(title="X"),
yaxis=dict(title="Cumulative Probability")
)
fig = go.Figure(data=data, layout=layout)
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
This distance is then used to determine the KS score, along with a p value relating to if the samples are indeed drawn from different populations. How does the KS score perform here?
display(ks_2samp(samp1, samp2))
It performs very well! It easily identifies that these samples are in fact drawn from different distributions.
Let's look at another example; assume that one of our samples is drawn from a bimodal distribution.
sample_size = 1000
mean1a = -1
mean1b = 3
var1a = 1
var1b = 1
gaussian1a = np.random.normal(mean1a, var1a, sample_size)
gaussian1b = np.random.normal(mean1b, var1b, sample_size)
samp1 = np.concatenate((gaussian1a, gaussian1b))
mean2 = 1
var2 = 1
samp2 = np.random.normal(mean2, var2, sample_size)
trace1a = go.Histogram(
x=samp1,
nbinsx=50,
name="Sample 1",
marker_color='blue',
opacity=0.7
)
trace2a = go.Histogram(
x=samp2,
nbinsx=25,
name="Sample 2",
marker_color='crimson',
opacity=0.7
)
trace1b = go.Histogram(
x=samp1,
nbinsx=20,
name="Sample 1",
marker_color='blue',
opacity=0.7,
cumulative_enabled=True,
showlegend=False
)
trace2b = go.Histogram(
x=samp2,
nbinsx=20,
name="Sample 2",
marker_color='crimson',
opacity=0.7,
cumulative_enabled=True,
showlegend=False
)
fig = plotly.subplots.make_subplots(
rows=1,
cols=2,
print_grid=False,
subplot_titles=('Sample 1 & 2 Histograms', 'Sample 1 & 2 Cumulative Histograms')
)
fig.append_trace(trace1a, 1, 1)
fig.append_trace(trace2a, 1, 1)
fig.append_trace(trace1b, 1, 2)
fig.append_trace(trace2b, 1, 2)
fig.update_traces(marker=dict(line=dict(width=1, color='black')))
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
layout = go.Layout(
barmode='overlay',
width=950,
height=450,
xaxis1=dict(title="X"),
yaxis=dict(title="Count"),
xaxis2=dict(title='X'),
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)'
)
fig.update_layout(
layout
)
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
However, both samples still have approximately the same over all mean of 1:
display(f'Mean of Sample 1: {samp1.mean()}' )
display(f'Mean of Sample 2: {samp2.mean()}')
But again, we can clearly see that they are indeed drawn from different populations, via the PDF and CDFs below:
x_axis = np.arange(-15, 15, 0.01)
y_samp1_pdf_func = lambda x: 0.5 * norm.pdf(x, mean1a, var1a) + 0.5 * norm.pdf(x, mean1b, var1b)
y_samp1_cdf_func = lambda x: 0.5 * norm.cdf(x, mean1a, var1a) + 0.5 * norm.cdf(x, mean1b, var1b)
y_samp1_pdf = y_samp1_pdf_func(x_axis)
y_samp1_cdf = y_samp1_cdf_func(x_axis)
# Assert that our distribution has been normalized correctly (i.e. the area sums to 1)
assert round(integrate.quad(y_samp1_pdf_func, -np.inf, np.inf)[0] - 1, 8) == 0.0
y_samp2_pdf = norm.pdf(x_axis, mean2, var2)
y_samp2_cdf = norm.cdf(x_axis, mean2, var2)
trace1a = go.Scatter(
x=x_axis,
y=y_samp1_pdf,
marker = dict(color='blue'),
fill='tozeroy',
fillcolor='rgba(0, 0, 255, 0.2)',
name="Sample 1"
)
trace2a = go.Scatter(
x=x_axis,
y=y_samp2_pdf,
marker = dict(color='red'),
fill='tozeroy',
fillcolor='rgba(255, 0, 0, 0.2)',
name="Sample 2"
)
trace1b = go.Scatter(
x=x_axis,
y=y_samp1_cdf,
marker = dict(color='blue'),
name="Sample 1",
showlegend=False
)
trace2b = go.Scatter(
x=x_axis,
y=y_samp2_cdf,
marker = dict(color='red'),
name="Sample 2",
showlegend=False
)
fig = plotly.subplots.make_subplots(
rows=1,
cols=2,
print_grid=False,
subplot_titles=('Sample 1 & 2 PDFs', 'Sample 1 & 2 CDFs')
)
fig.append_trace(trace1a, 1, 1)
fig.append_trace(trace2a, 1, 1)
fig.append_trace(trace1b, 1, 2)
fig.append_trace(trace2b, 1, 2)
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
layout = go.Layout(
width=950,
height=450,
xaxis1=dict(title="X"),
yaxis1=dict(title="Probability Density"),
xaxis2=dict(title='X'),
yaxis2=dict(title="Cumulative Probability"),
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)'
)
fig.update_layout(
layout
)
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
So, how does the t test perform here?
t2, p2 = ttest_ind(samp1, samp2)
display(f"t: {t2}, p: {p2}")
Again, it fails to distinguish the differences between our two distributions (since they have the same mean!). Let's check the KS Score:
ks_2samp(samp1, samp2)
Again the KS score is able to confidently distinguish the difference between the two samples. Visually it looks like:
x_axis_samp1, emp_cdf_samp1 = empirical_cdf(samp1)
x_axis_samp2, emp_cdf_samp2 = empirical_cdf(samp2)
trace1 = go.Scatter(
x=x_axis_samp1,
y=emp_cdf_samp1,
line={'shape': 'hv'},
marker = dict(color='blue'),
name="Sample 1"
)
trace2 = go.Scatter(
x=x_axis_samp2,
y=emp_cdf_samp2,
line={'shape': 'hv'},
marker = dict(color='red'),
name="Sample 2"
)
# Find points that yield largest distance (and hence highest KS Score), plot vertical line
_, ks_points = ks_score_implementation(samp1, samp2)
ks_point1, ks_point2 = ks_points
trace3 = go.Scatter(
x=[ks_point1[0]],
y=[ks_point1[1]],
marker = dict(
color = 'green',
size=6
),
mode="markers",
showlegend=False
)
trace4 = go.Scatter(
x=[ks_point2[0]],
y=[ks_point2[1]],
marker = dict(
color = 'green',
size=6
),
mode="markers",
showlegend=False
)
trace5 = go.Scatter(
x=[ks_point1[0], ks_point2[0]],
y=[ks_point1[1], ks_point2[1]],
mode='lines',
marker = dict(
color = 'green',
size=6
),
name="KS score (max distance <br>between empirical cdfs)"
)
data = [trace1, trace2, trace3, trace4, trace5]
layout = go.Layout(
width=700,
height=400,
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
title="Empirical CDF",
xaxis=dict(title="X"),
yaxis=dict(title="Cumulative Probability")
)
fig = go.Figure(data=data, layout=layout)
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
Now that we have taken the time to motivate the need behind the KS score, let's take a look under the hood to see how it actually is implemented.
We can do this via an example. To start, we can generate two samples that are indeed from two separate distributions; we will sample from a gaussian that has a mean of 0 and variance of 1, and a second gaussian that has a mean of 1 and variance of 1. We will call these two data generating distributions group 1 and group 2 respectively. These population distributions are shown below:
sample_size = 200
mean1 = 0
mean2 = 1
var1 = 1
var2 = 1
x_axis = np.arange(-3, 4, 0.01)
y_samp1_pdf = norm.pdf(x_axis, mean1, var1)
y_samp2_pdf = norm.pdf(x_axis, mean2, var2)
y_samp1_cdf = norm.cdf(x_axis, mean1, var1)
y_samp2_cdf = norm.cdf(x_axis, mean2, var2)
trace1a = go.Scatter(
x=x_axis,
y=y_samp1_pdf,
marker = dict(color='blue'),
fill='tozeroy',
fillcolor='rgba(0, 0, 255, 0.2)',
name="Group 1"
)
trace2a = go.Scatter(
x=x_axis,
y=y_samp2_pdf,
marker = dict(color='crimson'),
fill='tozeroy',
fillcolor='rgba(255, 0, 0, 0.2)',
name="Group 2"
)
trace1b = go.Scatter(
x=x_axis,
y=y_samp1_cdf,
marker = dict(color='blue'),
name="",
showlegend=False
)
trace2b = go.Scatter(
x=x_axis,
y=y_samp2_cdf,
marker = dict(color='crimson'),
name="",
showlegend=False
)
fig = plotly.subplots.make_subplots(
rows=1,
cols=2,
print_grid=False,
subplot_titles=('Group 1 & 2 PDFs', 'Group 1 & 2 CDFs')
)
fig.append_trace(trace1a, 1, 1)
fig.append_trace(trace2a, 1, 1)
fig.append_trace(trace1b, 1, 2)
fig.append_trace(trace2b, 1, 2)
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
layout = go.Layout(
width=950,
height=450,
xaxis1=dict(title="X"),
yaxis1=dict(title="Probability Density"),
xaxis2=dict(title='X'),
yaxis2=dict(title="Cumulative Probability"),
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)'
)
fig.update_layout(
layout
)
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
Now we can sample 200 data points from each group and plot the resulting histograms:
samp1 = np.random.normal(mean1, var1, sample_size)
samp2 = np.random.normal(mean2, var2, sample_size)
trace1a = go.Histogram(
x=samp1,
nbinsx=20,
name="Sample 1",
marker_color='blue',
opacity=0.7
)
trace2a = go.Histogram(
x=samp2,
nbinsx=20,
name="Sample 2",
marker_color='crimson',
opacity=0.7
)
trace1b = go.Histogram(
x=samp1,
nbinsx=20,
name="Sample 1",
marker_color='blue',
opacity=0.7,
cumulative_enabled=True,
showlegend=False
)
trace2b = go.Histogram(
x=samp2,
nbinsx=20,
name="Sample 2",
marker_color='crimson',
opacity=0.7,
cumulative_enabled=True,
showlegend=False
)
fig = plotly.subplots.make_subplots(
rows=1,
cols=2,
print_grid=False,
subplot_titles=('Sample 1 & 2 Histograms', 'Sample 1 & 2 Cumulative Histograms')
)
fig.append_trace(trace1a, 1, 1)
fig.append_trace(trace2a, 1, 1)
fig.append_trace(trace1b, 1, 2)
fig.append_trace(trace2b, 1, 2)
fig.update_traces(marker=dict(line=dict(width=1, color='black')))
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
layout = go.Layout(
barmode='overlay',
width=950,
height=450,
xaxis1=dict(title="X"),
yaxis=dict(title="Count"),
xaxis2=dict(title='X'),
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)'
)
fig.update_layout(
layout
)
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
Because we know that sample 1 and sample 2 were in fact drawn from different populations (group 1 and group 2), our goal is for a hypothesis test to also come to that same conclusion. Using the out of the box KS test from scipy, we can see if it manages to classify them as being generated by different populations:
display(ks_2samp(samp1, samp2))
And as expected, it does! The question is hows does ks_2samp work under the hood? We have briefly discussed that the test works by calculating the maximum distance between the respective sample's empircal CDF's. But, as we will see, that requires some thinking due to indexing and implementation issues arising from the nature of empircal CDF's and how we write them programmatically.
Before we can dig into the actual ks score implementation, it is worth defining what exactly an empirical distribution function is. Simply, it is:
A empirical distribution function, $\hat{F}$, is an estimate of the cumulative distribution function, $F$, that generated the points in the sample. It is a step function that jumps up by $\frac{1}{n}$ for each of the $n$ data points in the sample. It's value at any specified value of the measured variable is the fraction of observations of the measured variable that are less than or equal to the specified value.
Mathematically, that looks like:
$$\hat{F}_n(t) = \frac{\text{number of elements in the sample } \leq t}{n} = \frac{1}{n} \sum_{i=1}^n \textbf{1}_{X_i \leq t}$$Where $\textbf{1}$ is an indicator function, and our sample contains $X_1,\dots,X_n$. Visually, if we had a sample size of $10$, our EDF would look like:

Note in the above diagram that sorting our $x$ observations clearly makes this function much easier to implement. Let's demonstrate in code how exactly this works. However, before we do there a few things worth stopping to dig into at this point. First and foremost, we must keep in mind the inherent differences between
x data structure vs. actual $x$ value when plotting functionsFailure to remain aware of the above distinctions can lead to confusion. An example should make this more concrete; there are multiple ways to implement an EDF so that the final resulting plot is a nice step function as we expect. To start, we can implement our function exactly as it mathematically is defined. This could look like:
def empirical_cdf_v1(t, X):
"""Returns the EDF for a single input data point t. This requires the entire X sample."""
step = 1 / X.shape[0]
indicator_func = X <= t
indicator_func_sum = indicator_func.sum()
return indicator_func_sum * step
We can see it's resulting plot below for a sample size of 10:
def empirical_cdf_v1(t, X):
step = 1 / X.shape[0]
indicator_func = X <= t
indicator_func_sum = indicator_func.sum()
return indicator_func_sum * step
samp_a = np.random.normal(mean1, var1, 10)
samp_b = np.random.normal(mean2, var2, 10)
x_axis = np.arange(-3, 3, 0.001)
emp_cdf_v1_samp_a = []
for t in x_axis:
emp_cdf_v1_samp_a.append(empirical_cdf_v1(t, samp_a))
emp_cdf_v1_samp_b = []
for t in x_axis:
emp_cdf_v1_samp_b.append(empirical_cdf_v1(t, samp_b))
trace1 = go.Scatter(
x=x_axis,
y=emp_cdf_v1_samp_a,
marker = dict(color='blue'),
name="Sample a"
)
trace2 = go.Scatter(
x=x_axis,
y=emp_cdf_v1_samp_b,
marker = dict(color='red'),
name="Sample b"
)
data = [trace1, trace2]
layout = go.Layout(
width=650,
height=400,
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
title="Empirical CDF - Implementation V1",
xaxis=dict(title="X"),
yaxis=dict(title="Cumulative Probability")
)
fig = go.Figure(data=data, layout=layout)
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
Note that this is following the standard plotting convention; we created an evenly spaced $x$ input (via x_axis = np.arange(-3, 3, 0.001), passed that through our implementation of $\hat{F}$ (empirical_cdf_v1), and received an output of the same shape. In this case our programmatic implementation was essentially a perfect match to our mathematical function $\hat{F}$, and our plotting library, plotly, had everything it needed to create a plot of our EDF. Of course, this implementation isn't a perfectly continuous function, our input x_axis is still discrete with steps of 0.001; however, for all intents and purposes it is close enough. To get a bit more technical we mapped from the domain of $\hat{F}$, $t$, to the image of $\hat{F}$, $\hat{F}(t)$; this is image is a subset of the codomain of $\hat{F}$, namely $[0, 1]$.
Visually this looks like:

Where we have an infinite number of arrows mapping from $t$ to $\hat{F}$ (again, of course in the actual implemenation we do not have an infinite number of evenly spaced data points, but that is what our intent was).
Now, there is another approach that we could have taken (and this is where the distinction I mentioned can becomce tricky)! Looking at our first implementation, empirical_cdf_v1, notice that for every input $t$ we need to work with the entire sample $X$. I utilized a vector operation (to calculate the indicator function across all sample points, and the final sum), but the point remains that as we pass in our values of $t$, each time we must work with all of $X$ and determine how many points lie below $t$. This is clearly inefficient, and we can actually solve an equivalent data problem that will squeeze out the majority of this inefficiency. Take a look at the following implemenation:
def empirical_cdf_v2(x):
x.sort()
return x, np.arange(0, x.shape[0], 1) / (x.shape[0] - 1)
This will return our sorted sample $X$, and the corresponding $\hat{F}(X)$ for each sample point. Notice that this is no longer a function of $t$! $\hat{F}$ is now only defined for the data points in our sample.
Now, this is a far more efficient implementation, requiring our sample to be sorted only one time! However, it is very important to remember that this means that our implementation is not lining up with the mathematical definition of $\hat{F}$ perfectly; $\hat{F}$ is defined for $t = \mathbb{R}$, while empirical_cdf_v2 is only defined for points within our sample, i.e. $X \subset t$. In other words, it is currently a discrete function. Visually this looks like:

Now, unlike empirical_cdf_v1 which is discrete but in a negligible way, empirical_cdf_v2 is discrete in a more drastic way. Let's take a look at a plot of this implementation, without telling plotly to connect our data points via a line:
def empirical_cdf_v2(x):
x.sort()
return x, np.arange(0, x.shape[0], 1) / (x.shape[0] - 1)
x_axis_samp_a, emp_cdf_v2_samp_a = empirical_cdf_v2(samp_a)
x_axis_samp_b, emp_cdf_v2_samp_b = empirical_cdf_v2(samp_b)
trace1 = go.Scatter(
x=x_axis_samp_a,
y=emp_cdf_v2_samp_a,
line={'shape': 'hv'},
mode='markers',
marker = dict(color='blue'),
name="Sample 1"
)
trace2 = go.Scatter(
x=x_axis_samp_b,
y=emp_cdf_v2_samp_b,
line={'shape': 'hv'},
mode='markers',
marker = dict(color='red'),
name="Sample 2"
)
data = [trace1, trace2]
layout = go.Layout(
width=650,
height=400,
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
title="Empirical CDF - Implementation V2 (no lines)",
xaxis=dict(title="X"),
yaxis=dict(title="Cumulative Probability")
)
fig = go.Figure(data=data, layout=layout)
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
This is what I mean when I say our function returned data that is noticably discrete; our resulting scatter plot is not evenly spaced and has relatively large gaps. Now, we can leverage our plotting library to help us out here. If we pass mode='lines' to go.Scatter we can get a bit closer the EDF we desire:
trace1 = go.Scatter(
x=x_axis_samp_a,
y=emp_cdf_v2_samp_a,
mode='lines',
marker = dict(color='blue'),
name="Sample 1"
)
trace2 = go.Scatter(
x=x_axis_samp_b,
y=emp_cdf_v2_samp_b,
mode='lines',
marker = dict(color='red'),
name="Sample 2"
)
data = [trace1, trace2]
layout = go.Layout(
width=650,
height=400,
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
title="Empirical CDF - Implementation V1 (Diagonal Lines)",
xaxis=dict(title="X"),
yaxis=dict(title="Cumulative Probability")
)
fig = go.Figure(data=data, layout=layout)
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
That is not quite what we were after! Our data points were connected via a diagonal line, yet what we actually wanted was a step function. This can be achieved via passing line={'shape': 'hv'} to go.Scatter:
trace1 = go.Scatter(
x=x_axis_samp_a,
y=emp_cdf_v2_samp_a,
line={'shape': 'hv'},
mode='lines',
marker = dict(color='blue'),
name="Sample 1"
)
trace2 = go.Scatter(
x=x_axis_samp_b,
y=emp_cdf_v2_samp_b,
line={'shape': 'hv'},
mode='lines',
marker = dict(color='red'),
name="Sample 2"
)
data = [trace1, trace2]
layout = go.Layout(
width=650,
height=400,
paper_bgcolor='rgba(0,0,0,0)',
plot_bgcolor='rgba(0,0,0,0)',
title="Empirical CDF - Implementation V1 (Step func)",
xaxis=dict(title="X"),
yaxis=dict(title="Cumulative Probability")
)
fig = go.Figure(data=data, layout=layout)
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#f1f1f1')
plot_util(fig)
# plotly.offline.iplot(fig)
# html_fig = plotly.io.to_html(fig, include_plotlyjs=True)
# display(HTML(html_fig))
That looks much better! The thing to keep in mind though is that we are still not defined for all values of $X$. We have leveraged our plotting library to help our plot match up with what we expected from for $\hat{F}$. In other words, the gaps/missing data points are simply filled in a with a line by plotly, however we do not actually have those data points stored in any sort of array anywhere-our $X$ and $\hat{F}$ arrays only store data points from our sample.
With that said, due to the drastic increase in performance, empirical_cdf_v2 will be more favorable from an implementation standpoint; however, empirical_cdf_v2 is not an exact implementation of $\hat{F}$ and we must remain aware of these differences.
A big reason for taking the time to dissect the differences between the implemenation of empirical_cdf_v1 and empirical_cdf_v2 is because it will lead to confusion when we actually try and implement the KS score. Remember, the KS score is finding the largest difference in height between our two sample EDFs.
Let's see how that works if we use empirical_cdf_v1:
x_axis = np.arange(-3, 3, 0.001)
emp_cdf_v1_samp1 = []
for t in x_axis:
emp_cdf_v1_samp1.append(empirical_cdf_v1(t, samp1))
emp_cdf_v1_samp1 = np.array(emp_cdf_v1_samp1)
emp_cdf_v1_samp2 = []
for t in x_axis:
emp_cdf_v1_samp2.append(empirical_cdf_v1(t, samp2))
emp_cdf_v1_samp2 = np.array(emp_cdf_v1_samp2)
(emp_cdf_v1_samp1 - emp_cdf_v1_samp2).max()
x_axis = np.arange(-3, 3, 0.001)
emp_cdf_v1_samp1 = []
for t in x_axis:
emp_cdf_v1_samp1.append(empirical_cdf_v1(t, samp1))
emp_cdf_v1_samp1 = np.array(emp_cdf_v1_samp1)
emp_cdf_v1_samp2 = []
for t in x_axis:
emp_cdf_v1_samp2.append(empirical_cdf_v1(t, samp2))
emp_cdf_v1_samp2 = np.array(emp_cdf_v1_samp2)
display(f'Max Distance (custom implementation): {round((emp_cdf_v1_samp1 - emp_cdf_v1_samp2).max(), 2)}')
display(f'Max Distance (returned from scipy KS implementation): {ks_2samp(samp1, samp2)[0]}')
And as expected when calculating the difference between our two EDFs we ended up with 0.46, exactly what was yielded from ks_2samp. Now let's see what the max difference is when we use empirical_cdf_v2:
x_axis_samp_a, emp_cdf_v2_samp_a = empirical_cdf_v2(samp_a)
x_axis_samp_b, emp_cdf_v2_samp_b = empirical_cdf_v2(samp_b)
(emp_cdf_v2_samp_a - emp_cdf_v2_samp_b).max()
Interesting-taking the difference between the two EDFs yielded from empirical_cdf_v2 we get a maximum difference of 0.0. This clearly is not what we were looking for. Is there an error in our implementation? Not exactly; this comes back to the distinctions highlighted early, particularly the failure to keep them at front of mind.
Our empirical_cdf_v2 is not implemented like a standard function we are used to plotting (such as empirical_cdf_v1). It is not calculated across a continuous $x$ interval, rather the only inputs it is calculated only for are the data points in the sample. Generally, because we calculate functions (and plot them) based on a (approximately) continuous $x$ interval, any two functions will:
And this brings us to the issue of indexing that I mentioned earlier. Because empirical_cdf_v2 is a function of our sample data and not $t$, unless our sample data is identical for both sets, our indices will not align with the same $x$ values as they usually do! An example will make this more clear. Consider the case that we generally find ourselves in-we are plotting against a continuous, evenly spaced input space. Below it is shown as the integers $[1, 10]$ (but we can think of it as all real numbers in $[1, 10]$:

Because both $f$ and $g$ are evaluated across the same $x$ input, their indices end up matching up perfectly to the same x value!

Above it is clear that if we try and access the value of $f$ and $g$ at the index $4$, via f[4] and g[4], it is equivalent to evaluating $f(5)$ and $g(5)$. Put another way, the index 4 corresponds to the $x$ value of $5$ for each function domain.
Now, what happens when we start working with a function who's codomain is only that of our sample(s)? Say our samples are referred to as $x_1$ and $x_2$, each consisiting of $5$ data points:

In this case our indices do not have a natural correspondance! If we now try and access the value of $f$ and $g$ at index 4, we would in turn be evaluating $f(2.51)$ and $g(2.68)$. If we then try and perform an element wise subtraction such as f[4] - g[4], we are actually doing $f(2.51) - g(2.68)$.

Clearly this is not what we intended! We want to be evaluating the difference of $f$ and $g$ at the same $x$ value-not necessarily at the same index! In general evaluating at the index is simply convenient because the indices line up with the $x$ domain of each function. That is no longer the case.
At this point we should have a good understanding of the problem, but before going to the solution let's summarize what we just walked through:
empirical_cdf_v1 and empirical_cdf_v2) are meant to represent the same mathematical function, but one (empirical_cdf_v2) is only working with a subset of the data. We want to work with empirical_cdf_v2 because it is more computationally efficient.empirical_cdf_v1 and empirical_cdf_v2 we are fooled into thinking empirical_cdf_v2 is indeed continuous.empirical_cdf_v2 we find that there is no difference! This is because we are only working with a subset of the data, and hence our indexing will not match up with the domains of each respective EDF! In other words, if we have two samples that do not have the exact same $X$ data points, then the indices of the EDFs will not correspond to the evaluation of the EDF for the same $X$ input. With all of the above front of mind, we have a firm footing and are ready to move onto the actual implementation. Our starting point is as follows; we know that the correct ks score (maximum distance between EDF's) is:
display(ks_2samp(samp1, samp2))
We know that we may need to look out for indexing complications, and that we are working with samp1 and samp2 from earlier. To start, a key insight is that we do not need to calculate $\hat{F}$ for all values of $t$. Computationally it isn't necessary since $\hat{F}$ is a step function! We only need to calculate the EDFs of each sample against the overall sample space. So, if we define samp1 as the set $x_1$, samp2 as the set $x_2$, then their union, $x_1 \cup x_2$, is the overall sample space.
Before getting started let's define a function that has the same outcome as $\hat{F}$, but is far faster. To do so, recall that $\hat{F}$ simply takes in a value, $t$, and a sample, $x$, and determines what fraction of data points in the sample are less than or equal to $t$. One effective implementation may be:
def F_hat_of_t(t, x):
"""
Returns the fraction of data points in sample <= t
- t is the point F_hat is being evaluated at
- x is the associated sample used in the evaluation
""""
x.sort()
n = len(sample)
return np.searchsorted(sample, t, side='right') / n
def F_hat_of_t(t, x):
"""
Returns the fraction of data points in sample <= t
- t is the point F_hat is being evaluated at
- x is the associated sample used in the evaluation
"""
x.sort()
n = len(x)
return np.searchsorted(x, t, side='right') / n
Above we simple sort of sample, determine it's length, and then us np.searchsorted to find the index where $t$ would be inserted to preserve the ascending order of $x$. This index is then divided by $n$, the sample size, yielding the fraction of sample points less than or equal to $t$.
We can step through this as follows; assume our sample is x = np.array([20,4,3,20]. We first sort our sample:
x.sort()
x = np.array([3,4,10,20])
x.sort()
display(f'x sorted: {x}')
Then, let $t=15$. We know that a correct implementation of $\hat{F}$ would determine that 75% of values in the sample were less than or equal to $15$. Using np.searchsorted we have a return value of:
np.searchsorted(x, 15, side="right")
display(f'Return value of np.searchsorted: {np.searchsorted(x, 15, side="right")}')
The return value of 3 indicates that $15$ would need to be inserted at index = 3 in order to preserve order. If we divide this value by the size of our sample, $4$, we do in fact arrive at the correct output, 75%. What we have done is solve an equivalent data problem. At this point we can take in any value of $t$ and determine $\hat{F}(t)$. The next step is take in our entire sample space, $x_1 \cup x_2$, and determine the respective $\hat{F}_1(t)$ and $\hat{F}_2(t)$ evaluated over that space. Note, the reason for the subscript on $\hat{F}$ is because the EDF is in fact specific and distinct based on what sample it is evaluating. $\hat{F}_1(t)$ is evaluated for a given value of $t$ with an internal sample of $x_1$, and $\hat{F}_2(t)$ evaluated for a given value of $t$ with an internal sample of $x_2$.
Now we have managed to build a function that mimics $\hat{F}(t)$, the next step is to simple evaluate this function across our entire observed sample space. We could achieve that with a simple for loop:
def ks_score_custom_1(x1, x2):
x_union = np.concatenate([x1, x2])
edf_x1 = []
edf_x2 = []
for t in x_union: # Iterate over all of input space
edf_x1.append(F_hat_of_t(t, x1))
edf_x2.append(F_hat_of_t(t, x2))
edf_x1 = np.array(edf_x1)
edf_x2 = np.array(edf_x2)
d = max(edf_x1 - edf_x2)
return d
def ks_score_custom_1(x1, x2):
x_union = np.concatenate([x1, x2])
edf_x1 = []
edf_x2 = []
for t in x_union: # Iterate over all of input space
edf_x1.append(F_hat_of_t(t, x1))
edf_x2.append(F_hat_of_t(t, x2))
edf_x1 = np.array(edf_x1)
edf_x2 = np.array(edf_x2)
d = max(edf_x1 - edf_x2)
return d
display(f'Custom KS score with for loop: {ks_score_custom_1(samp1, samp2)}')
And we arrived at exact same value as the scipy implementation! However, we can in fact make this more efficient. We can leverage the fact that np.searchsorted can take a list of comparators. This means that instead of passing a single $t$ value to F_hat_of_t, we can pass an array of $t$ values. Namely, we can pass the entire observed sample input space:
Now, our final objective is to calculate the empirical distribution value for all possible values of our sample space! In other words, we want to take all values in samp1 and samp2 and run them through empirical_dist_helper. Thankfully np.searchsorted can take a list comparators. A final implementation will look like:
def ks_score_custom_2(x1, x2):
observed_sample_space = np.concatenate([x1, x2])
cdf1_data = F_hat_of_t(observed_sample_space, x1)
cdf2_data = F_hat_of_t(observed_sample_space, x2)
cdf_diffs = cdf1_data - cdf2_data
minS = -np.min(cdf_diffs)
maxS = np.max(cdf_diffs)
d = max(minS, maxS)
return d
def ks_score_custom_2(x1, x2):
observed_sample_space = np.concatenate([x1, x2])
cdf1_data = F_hat_of_t(observed_sample_space, x1)
cdf2_data = F_hat_of_t(observed_sample_space, x2)
cdf_diffs = cdf1_data - cdf2_data
minS = -np.min(cdf_diffs)
maxS = np.max(cdf_diffs)
d = max(minS, maxS)
return d
display(f'Custom KS score vectorized: {ks_score_custom_2(samp1, samp2)}')
Again we arrive at the same result seen via the scipy implementation. We have managed to successfully implement a function that can find the max distance between two EDFs, just as the KS score does.
display(ks_2samp(samp1, samp2))
np.sqrt(-0.5 * np.log(0.05)) * np.sqrt((200 + 200) / (200 * 200))
def p_value(d, n1, n2, upper):
z = d * np.sqrt((n1 * n2) / (n1 + n2))
total_summation = 0
for i in range(1, upper):
current_iteration_sum = (-1)**(i-1) * np.e**(-2*i**2 * z**2)
total_summation += current_iteration_sum
return 2 * total_summation
# d = 0.256168
# n1 = 4363
# n2 = 50
d = 0.515
n1 = 200
n2 = 200
upper = 3
p_value(d, n1, n2, upper)